Skip to main content
Version: 1.0.4

Binary Classification with VowpalWabbit on Criteo Dataset

SparkML Vector input

Read dataset

import pyspark.sql.types as T
from pyspark.sql import functions as F

schema = T.StructType(
[
T.StructField("label", T.IntegerType(), True),
*[T.StructField("i" + str(i), T.IntegerType(), True) for i in range(1, 13)],
*[T.StructField("s" + str(i), T.StringType(), True) for i in range(26)],
]
)

df = (
spark.read.format("csv")
.option("header", False)
.option("delimiter", "\t")
.schema(schema)
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/criteo_day0_1k.csv.gz")
)
# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()
display(df)

Reformat into VW-native format

See VW docs for format details

# create VW string format
cols = [
F.col("label"),
F.lit("|"),
*[F.col("i" + str(i)) for i in range(1, 13)],
*[F.col("s" + str(i)) for i in range(26)],
]

df = df.select(F.concat_ws(" ", *cols).alias("value"))
display(df)

Split the dataset into train and test

train, test = df.randomSplit([0.6, 0.4], seed=1)

Model Training

from synapse.ml.vw import VowpalWabbitGeneric

# number of partitions determines data parallelism
train = train.repartition(2)

model = VowpalWabbitGeneric(
numPasses=5,
useBarrierExecutionMode=False,
passThroughArgs="--holdout_off --loss_function logistic --link logistic",
).fit(train)

Model Prediction

predictions = model.transform(test)

predictions = predictions.withColumn(
"prediction", F.col("prediction").cast("double")
).withColumn("label", F.substring("value", 0, 1).cast("double"))

display(predictions)
from synapse.ml.train import ComputeModelStatistics

metrics = ComputeModelStatistics(
evaluationMetric="classification", labelCol="label", scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)